D:\a\scloud-dns\scloud-dns\src\workers\mod.rs
Line | Count | Source |
1 | | use crate::exceptions::SCloudException; |
2 | | use crate::workers::manager::StartGate; |
3 | | use crate::workers::task::InFlightTask; |
4 | | use crate::{log_error, log_info, log_sdebug, log_strace}; |
5 | | use anyhow::Result; |
6 | | use serde::{Deserialize, Serialize}; |
7 | | use std::sync::Arc; |
8 | | use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU64, AtomicUsize, Ordering}; |
9 | | use tokio::sync::{Mutex, MutexGuard, Semaphore, mpsc}; |
10 | | |
11 | | pub(crate) mod manager; |
12 | | pub(crate) mod queue; |
13 | | pub(crate) mod task; |
14 | | pub(crate) mod tests; |
15 | | pub(crate) mod types; |
16 | | |
17 | | #[allow(non_camel_case_types)] |
18 | | #[derive(Debug)] |
19 | | pub(crate) struct SCloudWorker { |
20 | | // IDENTITY |
21 | | pub(crate) worker_id: AtomicU64, |
22 | | pub(crate) worker_type: AtomicU8, |
23 | | |
24 | | // CHANNEL |
25 | | pub(crate) dns_tx: Mutex<Vec<mpsc::Sender<InFlightTask>>>, |
26 | | pub(crate) dns_rx: Mutex<Vec<mpsc::Receiver<InFlightTask>>>, |
27 | | |
28 | | // RESOURCES/LIMITS |
29 | | pub(crate) stack_size_bytes: AtomicUsize, |
30 | | pub(crate) buffer_budget_bytes: AtomicUsize, |
31 | | pub(crate) max_stack_size_bytes: AtomicUsize, |
32 | | pub(crate) max_buffer_budget_bytes: AtomicUsize, |
33 | | |
34 | | // RUNTIME STATE |
35 | | pub(crate) state: AtomicU8, |
36 | | pub(crate) shutdown_requested: AtomicBool, |
37 | | pub(crate) shutdown_mode: AtomicU8, |
38 | | |
39 | | // BACKPRESSURE/IN-FLIGHT |
40 | | pub(crate) in_flight: AtomicUsize, // for metrics |
41 | | pub(crate) in_flight_sem: Arc<Semaphore>, |
42 | | pub(crate) max_in_flight: AtomicUsize, |
43 | | |
44 | | // METRICS |
45 | | pub(crate) jobs_done: AtomicU64, |
46 | | pub(crate) jobs_failed: AtomicU64, |
47 | | pub(crate) jobs_retried: AtomicU64, |
48 | | |
49 | | pub(crate) last_job_started_ms: AtomicU64, |
50 | | pub(crate) last_job_finished_ms: AtomicU64, |
51 | | |
52 | | pub(crate) last_error_code: AtomicU64, |
53 | | pub(crate) last_error_at_ms: AtomicU64, |
54 | | |
55 | | // CORRELATION/TRACING |
56 | | pub(crate) last_task_id_hi: AtomicU64, // 128-bit UUID split |
57 | | pub(crate) last_task_id_lo: AtomicU64, |
58 | | } |
59 | | |
60 | | impl SCloudWorker { |
61 | | const NEVER_APPLIED: u8 = 0xFF; |
62 | | |
63 | 69 | pub(crate) fn new(worker_type: WorkerType) -> Result<Self, SCloudException> { |
64 | 69 | Ok(Self { |
65 | 69 | worker_id: AtomicU64::new(manager::generate_worker_id()), |
66 | 69 | worker_type: AtomicU8::new(worker_type as u8), |
67 | 69 | dns_tx: Mutex::new(Vec::new()), |
68 | 69 | dns_rx: Mutex::new(Vec::new()), |
69 | 69 | stack_size_bytes: AtomicUsize::new(2 * 1024 * 1024), |
70 | 69 | buffer_budget_bytes: AtomicUsize::new(4 * 1024 * 1024), |
71 | 69 | max_stack_size_bytes: AtomicUsize::new(32 * 1024 * 1024), |
72 | 69 | max_buffer_budget_bytes: AtomicUsize::new(256 * 1024 * 1024), |
73 | 69 | state: AtomicU8::new(WorkerState::INIT as u8), |
74 | 69 | shutdown_requested: AtomicBool::new(false), |
75 | 69 | shutdown_mode: AtomicU8::new(ShutdownMode::GRACEFUL as u8), |
76 | 69 | in_flight: AtomicUsize::new(0), |
77 | 69 | in_flight_sem: Arc::new(Semaphore::new(512)), |
78 | 69 | max_in_flight: AtomicUsize::new(512), |
79 | 69 | jobs_done: AtomicU64::new(0), |
80 | 69 | jobs_failed: AtomicU64::new(0), |
81 | 69 | jobs_retried: AtomicU64::new(0), |
82 | 69 | last_job_started_ms: AtomicU64::new(0), |
83 | 69 | last_job_finished_ms: AtomicU64::new(0), |
84 | 69 | last_error_code: AtomicU64::new(0), |
85 | 69 | last_error_at_ms: AtomicU64::new(0), |
86 | 69 | last_task_id_hi: AtomicU64::new(0), |
87 | 69 | last_task_id_lo: AtomicU64::new(0), |
88 | 69 | }) |
89 | 69 | } |
90 | | |
91 | 18 | pub async fn run(self: Arc<Self>, gate: Option<Arc<StartGate>>) -> Result<(), SCloudException> { |
92 | 18 | log_sdebug!( |
93 | | "Running SCloudWorker [ID: {}][TYPE: {:?}]", |
94 | 18 | self.get_worker_id(), |
95 | 18 | self.get_worker_type() |
96 | | ); |
97 | | |
98 | 18 | if let Some(g) = gate { |
99 | 18 | g.done().await; |
100 | 0 | } |
101 | 18 | match WorkerType::try_from(self.worker_type.load(Ordering::Relaxed)).unwrap() { |
102 | | WorkerType::LISTENER => { |
103 | 2 | return Err(SCloudException::SCLOUD_WORKER_LISTENER_NO_SOCKET); |
104 | | } |
105 | | WorkerType::DECODER => { |
106 | 2 | self.clone().set_state(WorkerState::IDLE); |
107 | 2 | let (rx0 , tx0 ) = self.get_dns_rx_tx().await?; |
108 | 0 | types::decoder::run_dns_decoder(self.clone(), rx, tx).await?; |
109 | | } |
110 | | WorkerType::QUERY_DISPATCHER => { |
111 | 2 | self.clone().set_state(WorkerState::IDLE); |
112 | 2 | let (rx0 , tx0 ) = self.get_dns_rx_tx().await?; |
113 | 0 | types::query_dispatcher::run_dns_query_dispatcher(self.clone(), rx, tx).await?; |
114 | | } |
115 | | WorkerType::CACHE_LOOKUP => { |
116 | 2 | self.clone().set_state(WorkerState::IDLE); |
117 | 2 | let (rx0 , tx0 ) = self.get_dns_rx_tx().await?; |
118 | 0 | types::cache_lookup::run_dns_cache_lookup(self.clone(), rx, tx).await?; |
119 | | } |
120 | | WorkerType::ZONE_MANAGER => { |
121 | 2 | self.clone().set_state(WorkerState::IDLE); |
122 | 2 | let (rx0 , tx0 ) = self.get_dns_rx_tx().await?; |
123 | 0 | types::zone_manager::run_dns_zone_manager(self.clone(), rx, tx).await?; |
124 | | } |
125 | | WorkerType::RESOLVER => { |
126 | 2 | self.clone().set_state(WorkerState::IDLE); |
127 | 2 | let (rx0 , tx0 ) = self.get_dns_rx_tx().await?; |
128 | 0 | types::resolver::run_dns_resolver(self.clone(), rx, tx).await?; |
129 | | } |
130 | | WorkerType::CACHE_WRITER => { |
131 | 2 | self.clone().set_state(WorkerState::IDLE); |
132 | 2 | let (rx0 , tx0 ) = self.get_dns_rx_tx().await?; |
133 | 0 | types::cache_writer::run_dns_cache_writer(self.clone(), rx, tx).await?; |
134 | | } |
135 | | WorkerType::ENCODER => { |
136 | 2 | self.clone().set_state(WorkerState::IDLE); |
137 | 2 | let (rx0 , tx0 ) = self.get_dns_rx_tx().await?; |
138 | 0 | types::encoder::run_dns_encoder(self.clone(), rx, tx).await?; |
139 | | } |
140 | | WorkerType::SENDER => { |
141 | 1 | self.clone().set_state(WorkerState::IDLE); |
142 | 1 | let rx0 = self.get_dns_rx().await?; |
143 | 0 | types::sender::run_dns_sender(self.clone(), rx).await?; |
144 | | } |
145 | | WorkerType::CACHE_JANITOR => { |
146 | 0 | self.clone().set_state(WorkerState::IDLE); |
147 | 0 | types::cache_janitor::run_dns_cache_janitor(self.clone()).await?; |
148 | | } |
149 | | WorkerType::METRICS => { |
150 | 0 | self.clone().set_state(WorkerState::IDLE); |
151 | 0 | types::metrics::start_otlp_logger().await; |
152 | | } |
153 | | WorkerType::TCP_ACCEPTOR => { |
154 | 1 | self.clone().set_state(WorkerState::IDLE); |
155 | 1 | let tx0 = self.get_dns_tx().await?; |
156 | 0 | types::tcp_acceptor::run_dns_tcp_acceptor(self.clone(), tx).await?; |
157 | | } |
158 | 0 | _ => {} |
159 | | } |
160 | 0 | Ok(()) |
161 | 18 | } |
162 | | |
163 | | #[inline] |
164 | 41 | pub fn get_worker_id(&self) -> u64 { |
165 | 41 | self.worker_id.load(Ordering::Relaxed) |
166 | 41 | } |
167 | | |
168 | | #[inline] |
169 | 50 | pub fn get_worker_type(&self) -> WorkerType { |
170 | 50 | WorkerType::try_from(self.worker_type.load(Ordering::Relaxed)).unwrap() |
171 | 50 | } |
172 | | |
173 | | #[inline] |
174 | 0 | pub async fn push_dns_rx(&self, rx: mpsc::Receiver<InFlightTask>) { |
175 | 0 | self.dns_rx.lock().await.push(rx); |
176 | 0 | } |
177 | | |
178 | | #[inline] |
179 | 0 | pub async fn push_dns_tx_many(&self, txs: Vec<mpsc::Sender<InFlightTask>>) { |
180 | 0 | self.dns_tx.lock().await.extend(txs); |
181 | 0 | } |
182 | | |
183 | | #[inline] |
184 | 15 | pub async fn get_dns_rx_tx( |
185 | 15 | &self, |
186 | 15 | ) -> Result< |
187 | 15 | ( |
188 | 15 | Vec<mpsc::Receiver<InFlightTask>>, |
189 | 15 | Vec<mpsc::Sender<InFlightTask>>, |
190 | 15 | ), |
191 | 15 | SCloudException, |
192 | 15 | > { |
193 | 15 | Ok((self.get_dns_rx().await?7 , self8 .get_dns_tx().await?7 )) |
194 | 15 | } |
195 | | |
196 | | #[inline] |
197 | 10 | pub async fn get_dns_tx(&self) -> Result<Vec<mpsc::Sender<InFlightTask>>, SCloudException> { |
198 | 10 | let mut guard = self.dns_tx.lock().await; |
199 | 10 | if guard.is_empty() { |
200 | 8 | return Err(SCloudException::SCLOUD_WORKER_TX_NOT_SET); |
201 | 2 | } |
202 | 2 | Ok(std::mem::take(&mut *guard)) |
203 | 10 | } |
204 | | |
205 | | #[inline] |
206 | 17 | pub async fn get_dns_rx(&self) -> Result<Vec<mpsc::Receiver<InFlightTask>>, SCloudException> { |
207 | 17 | let mut guard = self.dns_rx.lock().await; |
208 | 17 | if guard.is_empty() { |
209 | 8 | return Err(SCloudException::SCLOUD_WORKER_RX_NOT_SET); |
210 | 9 | } |
211 | 9 | Ok(std::mem::take(&mut *guard)) |
212 | 17 | } |
213 | | |
214 | | #[inline] |
215 | 2 | pub fn get_stack_size_bytes(&self) -> usize { |
216 | 2 | self.stack_size_bytes.load(Ordering::Relaxed) |
217 | 2 | } |
218 | | |
219 | | #[inline] |
220 | 2 | pub fn get_buffer_budget_bytes(&self) -> usize { |
221 | 2 | self.buffer_budget_bytes.load(Ordering::Relaxed) |
222 | 2 | } |
223 | | |
224 | | #[inline] |
225 | 2 | pub fn get_max_stack_size_bytes(&self) -> usize { |
226 | 2 | self.max_stack_size_bytes.load(Ordering::Relaxed) |
227 | 2 | } |
228 | | |
229 | | #[inline] |
230 | 2 | pub fn get_max_buffer_budget_bytes(&self) -> usize { |
231 | 2 | self.max_buffer_budget_bytes.load(Ordering::Relaxed) |
232 | 2 | } |
233 | | |
234 | | #[inline] |
235 | 7 | pub fn get_state(&self) -> u8 { |
236 | 7 | self.state.load(Ordering::Acquire) |
237 | 7 | } |
238 | | |
239 | | #[inline] |
240 | 2 | pub fn get_shutdown_requested(&self) -> bool { |
241 | 2 | self.shutdown_requested.load(Ordering::Acquire) |
242 | 2 | } |
243 | | |
244 | | #[inline] |
245 | 3 | pub fn get_shutdown_mode(&self) -> u8 { |
246 | 3 | self.shutdown_mode.load(Ordering::Acquire) |
247 | 3 | } |
248 | | |
249 | | #[inline] |
250 | 2 | pub fn get_in_flight(&self) -> usize { |
251 | 2 | self.in_flight.load(Ordering::Relaxed) |
252 | 2 | } |
253 | | |
254 | | #[inline] |
255 | 1 | pub fn get_in_flight_sem(&self) -> usize { |
256 | 1 | self.in_flight_sem.available_permits() |
257 | 1 | } |
258 | | |
259 | | #[inline] |
260 | 3 | pub fn get_max_in_flight(&self) -> usize { |
261 | 3 | self.max_in_flight.load(Ordering::Relaxed) |
262 | 3 | } |
263 | | |
264 | | #[inline] |
265 | 2 | pub fn get_jobs_done(&self) -> u64 { |
266 | 2 | self.jobs_done.load(Ordering::Relaxed) |
267 | 2 | } |
268 | | |
269 | | #[inline] |
270 | 2 | pub fn get_jobs_failed(&self) -> u64 { |
271 | 2 | self.jobs_failed.load(Ordering::Relaxed) |
272 | 2 | } |
273 | | |
274 | | #[inline] |
275 | 2 | pub fn get_jobs_retried(&self) -> u64 { |
276 | 2 | self.jobs_retried.load(Ordering::Relaxed) |
277 | 2 | } |
278 | | |
279 | | #[inline] |
280 | 2 | pub fn get_last_job_started_ms(&self) -> u64 { |
281 | 2 | self.last_job_started_ms.load(Ordering::Relaxed) |
282 | 2 | } |
283 | | |
284 | | #[inline] |
285 | 2 | pub fn get_last_job_finished_ms(&self) -> u64 { |
286 | 2 | self.last_job_finished_ms.load(Ordering::Relaxed) |
287 | 2 | } |
288 | | |
289 | | #[inline] |
290 | 2 | pub fn get_last_error_code(&self) -> u64 { |
291 | 2 | self.last_error_code.load(Ordering::Relaxed) |
292 | 2 | } |
293 | | |
294 | | #[inline] |
295 | 2 | pub fn get_last_error_at_ms(&self) -> u64 { |
296 | 2 | self.last_error_at_ms.load(Ordering::Relaxed) |
297 | 2 | } |
298 | | |
299 | | #[inline] |
300 | 2 | pub fn get_last_task_id_hi(&self) -> u64 { |
301 | 2 | self.last_task_id_hi.load(Ordering::Relaxed) |
302 | 2 | } |
303 | | |
304 | | #[inline] |
305 | 2 | pub fn get_last_task_id_lo(&self) -> u64 { |
306 | 2 | self.last_task_id_lo.load(Ordering::Relaxed) |
307 | 2 | } |
308 | | |
309 | | #[inline] |
310 | 1 | pub fn set_worker_id(&self, worker_id: u64) { |
311 | 1 | self.worker_id.store(worker_id, Ordering::Relaxed); |
312 | 1 | } |
313 | | |
314 | | #[inline] |
315 | 13 | pub fn set_worker_type(&self, worker_type: WorkerType) { |
316 | 13 | self.worker_type.store(worker_type as u8, Ordering::Relaxed); |
317 | 13 | } |
318 | | |
319 | | #[inline] |
320 | 1 | pub async fn set_dns_tx(&self, tx: mpsc::Sender<InFlightTask>) { |
321 | 1 | self.dns_tx.lock().await.push(tx); |
322 | 1 | } |
323 | | |
324 | | #[inline] |
325 | 1 | pub async fn set_dns_rx(&self, rx: mpsc::Receiver<InFlightTask>) { |
326 | 1 | self.dns_rx.lock().await.push(rx); |
327 | 1 | } |
328 | | |
329 | | #[inline] |
330 | 1 | pub fn set_stack_size_bytes(&self, stack_size_bytes: usize) { |
331 | 1 | self.stack_size_bytes |
332 | 1 | .store(stack_size_bytes, Ordering::Relaxed); |
333 | 1 | } |
334 | | |
335 | | #[inline] |
336 | 1 | pub fn set_buffer_budget_bytes(&self, buffer_budget_bytes: usize) { |
337 | 1 | self.buffer_budget_bytes |
338 | 1 | .store(buffer_budget_bytes, Ordering::Relaxed); |
339 | 1 | } |
340 | | |
341 | | #[inline] |
342 | 1 | pub fn set_max_stack_size_bytes(&self, max_stack_size_bytes: usize) { |
343 | 1 | self.max_stack_size_bytes |
344 | 1 | .store(max_stack_size_bytes, Ordering::Relaxed); |
345 | 1 | } |
346 | | |
347 | | #[inline] |
348 | 1 | pub fn set_max_buffer_budget_bytes(&self, max_buffer_budget_bytes: usize) { |
349 | 1 | self.max_buffer_budget_bytes |
350 | 1 | .store(max_buffer_budget_bytes, Ordering::Relaxed); |
351 | 1 | } |
352 | | |
353 | | #[inline] |
354 | 24 | pub fn set_state(&self, state: WorkerState) { |
355 | 24 | self.state.store(state as u8, Ordering::Relaxed); |
356 | 24 | } |
357 | | |
358 | | #[inline] |
359 | 1 | pub fn set_shutdown_requested(&self, shutdown_requested: bool) { |
360 | 1 | self.shutdown_requested |
361 | 1 | .store(shutdown_requested, Ordering::Relaxed); |
362 | 1 | } |
363 | | |
364 | | #[inline] |
365 | 2 | pub fn set_shutdown_mode(&self, shutdown_mode: ShutdownMode) { |
366 | 2 | self.shutdown_mode |
367 | 2 | .store(shutdown_mode as u8, Ordering::Relaxed); |
368 | 2 | } |
369 | | |
370 | | #[inline] |
371 | 1 | pub fn set_in_flight(&self, in_flight: usize) { |
372 | 1 | self.in_flight.store(in_flight, Ordering::Relaxed); |
373 | 1 | } |
374 | | |
375 | | #[inline] |
376 | 5 | pub fn set_max_in_flight(&self, max_in_flight: usize) { |
377 | 5 | self.max_in_flight.store(max_in_flight, Ordering::Relaxed); |
378 | 5 | } |
379 | | |
380 | | #[inline] |
381 | 1 | pub fn set_jobs_done(&self, jobs_done: u64) { |
382 | 1 | self.jobs_done.store(jobs_done, Ordering::Relaxed); |
383 | 1 | } |
384 | | |
385 | | #[inline] |
386 | 1 | pub fn set_jobs_failed(&self, jobs_failed: u64) { |
387 | 1 | self.jobs_failed.store(jobs_failed, Ordering::Relaxed); |
388 | 1 | } |
389 | | |
390 | | #[inline] |
391 | 1 | pub fn set_jobs_retried(&self, jobs_retried: u64) { |
392 | 1 | self.jobs_retried.store(jobs_retried, Ordering::Relaxed); |
393 | 1 | } |
394 | | |
395 | | #[inline] |
396 | 1 | pub fn set_last_job_started_ms(&self, last_job_started_ms: u64) { |
397 | 1 | self.last_job_started_ms |
398 | 1 | .store(last_job_started_ms, Ordering::Relaxed); |
399 | 1 | } |
400 | | |
401 | | #[inline] |
402 | 1 | pub fn set_last_job_finished_ms(&self, last_job_finished_ms: u64) { |
403 | 1 | self.last_job_finished_ms |
404 | 1 | .store(last_job_finished_ms, Ordering::Relaxed); |
405 | 1 | } |
406 | | |
407 | | #[inline] |
408 | 1 | pub fn set_last_error_code(&self, last_error_code: u64) { |
409 | 1 | self.last_error_code |
410 | 1 | .store(last_error_code, Ordering::Relaxed); |
411 | 1 | } |
412 | | |
413 | | #[inline] |
414 | 1 | pub fn set_last_error_at_ms(&self, last_error_at_ms: u64) { |
415 | 1 | self.last_error_at_ms |
416 | 1 | .store(last_error_at_ms, Ordering::Relaxed); |
417 | 1 | } |
418 | | |
419 | | #[inline] |
420 | 1 | pub fn set_last_task_id_hi(&self, last_task_id_hi: u64) { |
421 | 1 | self.last_task_id_hi |
422 | 1 | .store(last_task_id_hi, Ordering::Relaxed); |
423 | 1 | } |
424 | | |
425 | | #[inline] |
426 | 1 | pub fn set_last_task_id_lo(&self, last_task_id_lo: u64) { |
427 | 1 | self.last_task_id_lo |
428 | 1 | .store(last_task_id_lo, Ordering::Relaxed); |
429 | 1 | } |
430 | | } |
431 | | |
432 | | #[repr(u8)] |
433 | | #[allow(unused)] |
434 | | #[allow(non_camel_case_types)] |
435 | | #[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize, Eq)] |
436 | | pub enum WorkerType { |
437 | | NONE = 99, |
438 | | LISTENER = 0, |
439 | | DECODER = 1, |
440 | | QUERY_DISPATCHER = 2, |
441 | | CACHE_LOOKUP = 3, |
442 | | ZONE_MANAGER = 4, |
443 | | RESOLVER = 5, |
444 | | CACHE_WRITER = 6, |
445 | | ENCODER = 7, |
446 | | SENDER = 8, |
447 | | |
448 | | CACHE_JANITOR = 9, |
449 | | |
450 | | METRICS = 10, |
451 | | TCP_ACCEPTOR = 11, |
452 | | } |
453 | | |
454 | | impl TryFrom<u8> for WorkerType { |
455 | | type Error = (); |
456 | | |
457 | 69 | fn try_from(v: u8) -> Result<Self, Self::Error> { |
458 | 69 | Ok(match v { |
459 | 7 | 0 => WorkerType::LISTENER, |
460 | 7 | 1 => WorkerType::DECODER, |
461 | 7 | 2 => WorkerType::QUERY_DISPATCHER, |
462 | 7 | 3 => WorkerType::CACHE_LOOKUP, |
463 | 7 | 4 => WorkerType::ZONE_MANAGER, |
464 | 7 | 5 => WorkerType::RESOLVER, |
465 | 7 | 6 => WorkerType::CACHE_WRITER, |
466 | 7 | 7 => WorkerType::ENCODER, |
467 | 4 | 8 => WorkerType::SENDER, |
468 | 1 | 9 => WorkerType::CACHE_JANITOR, |
469 | 1 | 10 => WorkerType::METRICS, |
470 | 5 | 11 => WorkerType::TCP_ACCEPTOR, |
471 | 2 | 99 => WorkerType::NONE, |
472 | | // TODO: return an SCloudException |
473 | 0 | _ => return Err(()), |
474 | | }) |
475 | 69 | } |
476 | | } |
477 | | |
478 | | #[repr(u8)] |
479 | | #[allow(unused)] |
480 | | #[allow(non_camel_case_types)] |
481 | | #[derive(Debug, PartialEq)] |
482 | | pub(crate) enum WorkerState { |
483 | | INIT = 0, |
484 | | IDLE = 1, |
485 | | BUSY = 2, |
486 | | PAUSED = 3, |
487 | | STOPPING = 4, |
488 | | STOPPED = 5, |
489 | | } |
490 | | |
491 | | impl TryFrom<u8> for WorkerState { |
492 | | type Error = (); |
493 | | |
494 | 6 | fn try_from(v: u8) -> Result<Self, Self::Error> { |
495 | 6 | Ok(match v { |
496 | 1 | 0 => WorkerState::INIT, |
497 | 1 | 1 => WorkerState::IDLE, |
498 | 1 | 2 => WorkerState::BUSY, |
499 | 1 | 3 => WorkerState::PAUSED, |
500 | 1 | 4 => WorkerState::STOPPING, |
501 | 1 | 5 => WorkerState::STOPPED, |
502 | | // TODO: return an SCloudException |
503 | 0 | _ => return Err(()), |
504 | | }) |
505 | 6 | } |
506 | | } |
507 | | |
508 | | #[repr(u8)] |
509 | | #[allow(unused)] |
510 | | #[allow(non_camel_case_types)] |
511 | | #[derive(Debug, PartialEq)] |
512 | | pub(crate) enum ShutdownMode { |
513 | | GRACEFUL = 0, |
514 | | IMMEDIATE = 1, |
515 | | } |
516 | | |
517 | | impl TryFrom<u8> for ShutdownMode { |
518 | | type Error = (); |
519 | | |
520 | 2 | fn try_from(v: u8) -> Result<Self, Self::Error> { |
521 | 2 | Ok(match v { |
522 | 1 | 0 => ShutdownMode::GRACEFUL, |
523 | 1 | 1 => ShutdownMode::IMMEDIATE, |
524 | | // TODO: return an SCloudException |
525 | 0 | _ => return Err(()), |
526 | | }) |
527 | 2 | } |
528 | | } |
529 | | |
530 | 0 | pub fn spawn_worker( |
531 | 0 | worker: Arc<SCloudWorker>, |
532 | 0 | gate: Arc<StartGate>, |
533 | 0 | ) -> tokio::task::JoinHandle<()> { |
534 | 0 | tokio::spawn(async move { |
535 | 0 | gate.wait_turn(worker.get_worker_id()).await; |
536 | | |
537 | 0 | if let Err(e) = worker.clone().run(Some(gate.clone())).await { |
538 | 0 | log_error!("Worker {} failed: {:?}", worker.get_worker_id(), e); |
539 | 0 | } |
540 | | |
541 | 0 | gate.done().await; |
542 | 0 | }) |
543 | 0 | } |